Kubernetes 源码笔记(kube-proxy)

kube-proxy 运行在 Kubernetes 集群的计算节点上,负责 Service 的负载均衡及服务代理。

原理

kube-proxy 工作的核心的流程是从 kube-apiserver 同步 service 和 endpoint 的信息,然后将其更新到 iptables。从 1.11 版本开始,基于 IPVS 的负载均衡已经 GA 了,可以预见的是未来 k8s 会主推 IPVS 模式。

kube-proxy 目前有3种常见的 proxyMode,分别是 userspace,iptables,ipvs,还有一种是 Windows 平台的 kernelspace。其中 userspace mode 是 v1.0 及以前版本的默认模式。从 v1.1 版本开始,增加了 iptables mode,在 v1.3版本中正式替代了 userspace 模式成为默认模式(需要 iptables 的版本>= 1.4.11)。在实践中,社区发现 iptables 的扩展性和性能都不佳,毕竟 iptables 是为防火墙设计的,它的特性并不适合当做大规模服务的负载均衡,华为贡献了基于 IPVS 实现 kube-proxy 的特性。IPVS 是 LVS 的负载均衡模块,同样基于 netfilter,但比 iptables 性能更好,具备更好的可扩展性,鉴于 IPVS 是 kube-proxy 的未来趋势,在阅读 kube-proxy 源码的时候可以着重看这部分。

下面我们分别来看 kube-proxy 的三种模式的实现。对于 userspace,iptables 这两种模式,本文只做简述,对于 ipvs 模式,会结合其设计文档,代码进行分析。

userspace mode

基于用户态的 proxy,service 的请求会先从用户空间进入内核 iptables,然后再回到用户空间,由 kube-proxy 完成后端 endpoints 的选择和代理工作,这种方式流量从用户空间进出内核带来的性能损耗比较大。原理如下图:

kube-proxy-userspace-mode

示例

摘自 kubernetes入门之kube-proxy实现原理

现在有一个 service ssh-service1

1
2
3
4
$ kubectl get service
NAME LABELS SELECTOR IP(S) PORT(S)
kubernetes component=apiserver,provider=kubernetes <none> 10.254.0.1 443/TCP
ssh-service1 name=ssh,role=service ssh-service=true 10.254.132.107 2222/TCP

这个 service 的 cluster ip 是 10.254.132.107。

1
2
3
4
5
6
7
8
9
10
11
12
$ kubectl describe service ssh-service1 
Name: ssh-service1
Namespace: default
Labels: name=ssh,role=service
Selector: ssh-service=true
Type: LoadBalancer
IP: 10.254.132.107
Port: <unnamed> 2222/TCP
NodePort: <unnamed> 30239/TCP
Endpoints: <none>
Session Affinity: None
No events.

这时候 iptables 的规则是这样的:

1
2
3
4
5
6
$ sudo iptables -S -t nat
...
-A KUBE-NODEPORT-CONTAINER -p tcp -m comment --comment "default/ssh-service1:" -m tcp --dport 30239 -j REDIRECT --to-ports 36463
-A KUBE-NODEPORT-HOST -p tcp -m comment --comment "default/ssh-service1:" -m tcp --dport 30239 -j DNAT --to-destination 10.0.0.5:36463
-A KUBE-PORTALS-CONTAINER -d 10.254.132.107/32 -p tcp -m comment --comment "default/ssh-service1:" -m tcp --dport 2222 -j REDIRECT --to-ports 36463
-A KUBE-PORTALS-HOST -d 10.254.132.107/32 -p tcp -m comment --comment "default/ssh-service1:" -m tcp --dport 2222 -j DNAT --to-destination 10.0.0.5:36463

这个 node 的 ip 是 10.0.0.5,那么访问 10.0.0.5:30239 就会被转发到本机的 36463 端口,在访问 10.254.132.107:2222 时,也会转发到 36463 端口,36463 端口是由 kube-proxy 监听的,流量接着会被导到后端的 pod 上。

iptables mode

iptables 的方式是完全通过内核的 iptables 实现 service 的代理和 LB,这是 v1.2 及以后版本的默认模式,原理如下图:

kube-proxy-iptables-mode

这种方式通过 iptable NAT 完成转发,也有一定的性能损耗;此外,iptables 没有增量更新的功能,如果更新一条规则需要整体刷新,时间长,而且对服务的稳定性也有影响;iptable 是串行的,一个 node 上如果有很多的 iptables 规则,流量需要经过所有的匹配再进行转发,在服务规模比较大的情况下对时间、CPU、内存都有比较大的消耗。这导致的一个结果是,大型企业将 k8s 用于生产时,不会直接 kube-proxy 作为服务代理,而是使用 NodePort 或使用 externalIP(比如这篇文章中的例子),或自己开发,配置负载均衡代替 kube-proxy。

示例

摘自 kubernetes入门之kube-proxy实现原理

创建 mysql-service 的 service:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
apiVersion: v1
kind: Service
metadata:
labels:
name: mysql
role: service
name: mysql-service
spec:
ports:
- port: 3306
targetPort: 3306
nodePort: 30964
type: NodePort
selector:
mysql-service: "true"

这个服务的 cluster ip 是 10.254.162.44,代理的两个 pod 的 ip 是 192.168.125.129 和192.168.125.131,再看 iptables

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$ iptables -S -t nat
...
-A PREROUTING -m comment --comment "kubernetes service portals" -j KUBE-SERVICES
-A OUTPUT -m comment --comment "kubernetes service portals" -j KUBE-SERVICES
-A POSTROUTING -m comment --comment "kubernetes postrouting rules" -j KUBE-POSTROUTING
-A KUBE-MARK-MASQ -j MARK --set-xmark 0x4000/0x4000
-A KUBE-NODEPORTS -p tcp -m comment --comment "default/mysql-service:" -m tcp --dport 30964 -j KUBE-MARK-MASQ
-A KUBE-NODEPORTS -p tcp -m comment --comment "default/mysql-service:" -m tcp --dport 30964 -j KUBE-SVC-67RL4FN6JRUPOJYM
-A KUBE-SEP-ID6YWIT3F6WNZ47P -s 192.168.125.129/32 -m comment --comment "default/mysql-service:" -j KUBE-MARK-MASQ
-A KUBE-SEP-ID6YWIT3F6WNZ47P -p tcp -m comment --comment "default/mysql-service:" -m tcp -j DNAT --to-destination 192.168.125.129:3306
-A KUBE-SEP-IN2YML2VIFH5RO2T -s 192.168.125.131/32 -m comment --comment "default/mysql-service:" -j KUBE-MARK-MASQ
-A KUBE-SEP-IN2YML2VIFH5RO2T -p tcp -m comment --comment "default/mysql-service:" -m tcp -j DNAT --to-destination 192.168.125.131:3306
-A KUBE-SERVICES -d 10.254.162.44/32 -p tcp -m comment --comment "default/mysql-service: cluster IP" -m tcp --dport 3306 -j KUBE-SVC-67RL4FN6JRUPOJYM
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
-A KUBE-SVC-67RL4FN6JRUPOJYM -m comment --comment "default/mysql-service:" -m statistic --mode random --probability 0.50000000000 -j KUBE-SEP-ID6YWIT3F6WNZ47P
-A KUBE-SVC-67RL4FN6JRUPOJYM -m comment --comment "default/mysql-service:" -j KUBE-SEP-IN2YML2VIFH5RO2T

如果通过 node 的 30964 端口访问,匹配的是下面两条链:

1
2
-A KUBE-NODEPORTS -p tcp -m comment --comment "default/mysql-service:" -m tcp --dport 30964 -j KUBE-MARK-MASQ
-A KUBE-NODEPORTS -p tcp -m comment --comment "default/mysql-service:" -m tcp --dport 30964 -j KUBE-SVC-67RL4FN6JRUPOJYM

如果直接访问 cluster ip (10.254.162.44),匹配的是下面的规则:

1
-A KUBE-SERVICES -d 10.254.162.44/32 -p tcp -m comment --comment "default/mysql-service: cluster IP" -m tcp --dport 3306 -j KUBE-SVC-67RL4FN6JRUPOJYM

上述的两种匹配都会跳转到 KUBE-SVC-67RL4FN6JRUPOJYM 的链。

1
2
-A KUBE-SVC-67RL4FN6JRUPOJYM -m comment --comment "default/mysql-service:" -m statistic --mode random --probability 0.50000000000 -j KUBE-SEP-ID6YWIT3F6WNZ47P
-A KUBE-SVC-67RL4FN6JRUPOJYM -m comment --comment "default/mysql-service:" -j KUBE-SEP-IN2YML2VIFH5RO2T

50% 的概率匹配 KUBE-SEP-ID6YWIT3F6WNZ47P,50% 的概率匹配 KUBE-SEP-IN2YML2VIFH5RO2T。

KUBE-SEP-ID6YWIT3F6WNZ47P 的作用是通过 DNAT 发送到192.168.125.129的3306端口,KUBE-SEP-IN2YML2VIFH5RO2T 同理,发送的是192.168.125.131的3306端口。

1
2
3
4
-A KUBE-SEP-ID6YWIT3F6WNZ47P -s 192.168.125.129/32 -m comment --comment "default/mysql-service:" -j KUBE-MARK-MASQ
-A KUBE-SEP-ID6YWIT3F6WNZ47P -p tcp -m comment --comment "default/mysql-service:" -m tcp -j DNAT --to-destination 192.168.125.129:3306
-A KUBE-SEP-IN2YML2VIFH5RO2T -s 192.168.125.131/32 -m comment --comment "default/mysql-service:" -j KUBE-MARK-MASQ
-A KUBE-SEP-IN2YML2VIFH5RO2T -p tcp -m comment --comment "default/mysql-service:" -m tcp -j DNAT --to-destination 192.168.125.131:3306

IPVS

如果是使用 Kubernetes 1.8 或更高版本,可以使用 ipvs 模式,它是对 iptables 的替换。ipvs 模式下增加规则是增量式的,不会强制全量刷新,匹配服务时也不会进行串行匹配,而是通过一定规则进行哈希匹配,以找到相应的规则。

相比于 iptables,它具备更高的性能和稳定性。下图是华为云提供的压测数据:

ipvs_vs_iptables

IPVS 是 LVS 的一个核心软件模块,所以我们先介绍 LVS。LVS 是 Linux VIrtual Server 的缩写,这是由章文嵩博士发起的项目,目前已经合并到了 Linux 内核中。

IPVS 是 LVS 的 IP 负载均衡模块,安装在 LVS 集群作为负载均衡的主节点。

LVS 集群中有 Director 和 Real Server 两个角色,有三种类型的 IP 地址:

  • Director Virtual IP,调度器用于与客户端通信的 IP 地址,简称为 VIP
  • Director IP,调度器用于与 Real Server 通信的 IP 地址,简称为 DIP
  • Real IP,后端主机与调度器通信的 IP 地址,简称为 RIP

LVS 有三种调度模式:

LVS-NAT Network Address Transform

lvs-nat

LVS-TUN IPTuneling

lvs-tun

LVS-DR Direct Routing

lvs-dr

目前 kube-proxy 的实现选择的是 NAT 模式。

LVS 有十种调度算法:

静态方法,根据算法本身进行轮询调度

  • RR, Round Robin
  • WRR,Wrighted RR
  • SH,SourceIP Hash
  • DH,Destination Hash

动态方法,根据算法以及 RS 的当前负载状态进行调度

  • LC,least connections
  • WLC,Weighted Least Connection
  • SED,Shortest Expection Delay
  • NQ,Never Queue
  • LBLC,Locality-Based Least Connection
  • LBLCR,Locality-Based Least Connections withReplication

kube-proxy 可以通过 --ipvs-scheduler 参数选择调度算法,默认情况下是 Round Robin 算法。

关于 LVS 的详细介绍,可以查阅 Linux服务器集群系统(一)

创建一个 service 后,k8s 会在每个节点上创建一个网卡,同时绑定在 Service IP(VIP)上,这时内核会认为 VIP 就是本机 IP,通过 socket 调用,创建 IPVS 的 virtual server 和 real server,分别对应 k8s 的 Service 和 Endpoints。socket 的调用由 docker 公司的 libnetwork 库完成。

代码解析

了解了 kube-proxy 的原理,再阅读代码就很容易理解了。代码版本是 v1.12.2-beta.0。

Run()

Run 方法在 cmd/kube-proxy/proxy.go 中。

1
2
3
4
5
6
7
8
9
10
11
12
func (o *Options) Run() error {
if len(o.WriteConfigTo) > 0 {
return o.writeConfigFile()
}

proxyServer, err := NewProxyServer(o)
if err != nil {
return err
}

return proxyServer.Run()
}

逻辑很简单。首先通过 NewProxyServer 构造一个 ProxyServer,然后调用它的 Run 方法来运行。

初始化 ProxyServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
func newProxyServer(
config *proxyconfigapi.KubeProxyConfiguration,
cleanupAndExit bool,
cleanupIPVS bool,
scheme *runtime.Scheme,
master string) (*ProxyServer, error) {

if config == nil {
return nil, errors.New("config is required")
}

if c, err := configz.New(proxyconfigapi.GroupName); err == nil {
c.Set(config)
} else {
return nil, fmt.Errorf("unable to register configz: %s", err)
}

protocol := utiliptables.ProtocolIpv4
if net.ParseIP(config.BindAddress).To4() == nil {
glog.V(0).Infof("IPv6 bind address (%s), assume IPv6 operation", config.BindAddress)
protocol = utiliptables.ProtocolIpv6
}

var iptInterface utiliptables.Interface
var ipvsInterface utilipvs.Interface
var kernelHandler ipvs.KernelHandler
var ipsetInterface utilipset.Interface
var dbus utildbus.Interface

// Create a iptables utils.
execer := exec.New()

dbus = utildbus.New()
iptInterface = utiliptables.New(execer, dbus, protocol)
kernelHandler = ipvs.NewLinuxKernelHandler()
ipsetInterface = utilipset.New(execer)
canUseIPVS, _ := ipvs.CanUseIPVSProxier(kernelHandler, ipsetInterface)
if canUseIPVS {
ipvsInterface = utilipvs.New(execer)
}

// We omit creation of pretty much everything if we run in cleanup mode
if cleanupAndExit {
return &ProxyServer{
execer: execer,
IptInterface: iptInterface,
IpvsInterface: ipvsInterface,
IpsetInterface: ipsetInterface,
CleanupAndExit: cleanupAndExit,
}, nil
}

client, eventClient, err := createClients(config.ClientConnection, master)
if err != nil {
return nil, err
}

// Create event recorder
hostname, err := utilnode.GetHostname(config.HostnameOverride)
if err != nil {
return nil, err
}
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme, v1.EventSource{Component: "kube-proxy", Host: hostname})

nodeRef := &v1.ObjectReference{
Kind: "Node",
Name: hostname,
UID: types.UID(hostname),
Namespace: "",
}

var healthzServer *healthcheck.HealthzServer
var healthzUpdater healthcheck.HealthzUpdater
if len(config.HealthzBindAddress) > 0 {
healthzServer = healthcheck.NewDefaultHealthzServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)
healthzUpdater = healthzServer
}

var proxier proxy.ProxyProvider
var serviceEventHandler proxyconfig.ServiceHandler
var endpointsEventHandler proxyconfig.EndpointsHandler

proxyMode := getProxyMode(string(config.Mode), iptInterface, kernelHandler, ipsetInterface, iptables.LinuxKernelCompatTester{})
nodeIP := net.ParseIP(config.BindAddress)
if nodeIP.IsUnspecified() {
nodeIP = getNodeIP(client, hostname)
}
if proxyMode == proxyModeIPTables {
// ...
} else if proxyMode == proxyModeIPVS {
glog.V(0).Info("Using ipvs Proxier.")
proxierIPVS, err := ipvs.NewProxier(
iptInterface,
ipvsInterface,
ipsetInterface,
utilsysctl.New(),
execer,
config.IPVS.SyncPeriod.Duration,
config.IPVS.MinSyncPeriod.Duration,
config.IPVS.ExcludeCIDRs,
config.IPTables.MasqueradeAll,
int(*config.IPTables.MasqueradeBit),
config.ClusterCIDR,
hostname,
nodeIP,
recorder,
healthzServer,
config.IPVS.Scheduler,
config.NodePortAddresses,
)
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
metrics.RegisterMetrics()
proxier = proxierIPVS
serviceEventHandler = proxierIPVS
endpointsEventHandler = proxierIPVS
glog.V(0).Info("Tearing down inactive rules.")
// TODO this has side effects that should only happen when Run() is invoked.
userspace.CleanupLeftovers(iptInterface)
iptables.CleanupLeftovers(iptInterface)
} else {
glog.V(0).Info("Using userspace Proxier.")
// ...
}

iptInterface.AddReloadFunc(proxier.Sync)

return &ProxyServer{
Client: client,
EventClient: eventClient,
IptInterface: iptInterface,
IpvsInterface: ipvsInterface,
IpsetInterface: ipsetInterface,
execer: execer,
Proxier: proxier,
Broadcaster: eventBroadcaster,
Recorder: recorder,
ConntrackConfiguration: config.Conntrack,
Conntracker: &realConntracker{},
ProxyMode: proxyMode,
NodeRef: nodeRef,
MetricsBindAddress: config.MetricsBindAddress,
EnableProfiling: config.EnableProfiling,
OOMScoreAdj: config.OOMScoreAdj,
ResourceContainer: config.ResourceContainer,
ConfigSyncPeriod: config.ConfigSyncPeriod.Duration,
ServiceEventHandler: serviceEventHandler,
EndpointsEventHandler: endpointsEventHandler,
HealthzServer: healthzServer,
}, nil
}

以上代码的基本流程是:

  1. 创建 iptables 的接口 iptInterface 以及 ipvs 的接口 ipvsInterface
  2. 根据上面的字段初始化 ProxyServer
  3. 创建健康检查服务
  4. 初始化 serviceEventHandlerendpointsEventHandler,用于定义 service 和 endpoints 的发生变化后的处理方法。
  5. 根据 proxyMode 进入不同的分支
  6. 如果是使用 ipvs 模式,调用 ./pkg/proxy/ipvs/proxier.goNewProxier 进行初始化。iptable, userspace 模式同理

需要注意的是调用 NewProxier 过程中,有 proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) 这么一行,用来初始化 syncRunner,这里很重要,在下面我们会看到。

运行 ProxyServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
func (s *ProxyServer) Run() error {
// glog ...
// remove iptables rules and exit
if s.CleanupAndExit {
encounteredError := userspace.CleanupLeftovers(s.IptInterface)
encounteredError = iptables.CleanupLeftovers(s.IptInterface) || encounteredError
encounteredError = ipvs.CleanupLeftovers(s.IpvsInterface, s.IptInterface, s.IpsetInterface, s.CleanupIPVS) || encounteredError
if encounteredError {
return errors.New("encountered an error while tearing down rules.")
}
return nil
}

// TODO(vmarmol): Use container config for this.
var oomAdjuster *oom.OOMAdjuster
if s.OOMScoreAdj != nil {
oomAdjuster = oom.NewOOMAdjuster()
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*s.OOMScoreAdj)); err != nil {
glog.V(2).Info(err)
}
}

if len(s.ResourceContainer) != 0 {
if err := resourcecontainer.RunInResourceContainer(s.ResourceContainer); err != nil {
glog.Warningf("Failed to start in resource-only container %q: %v", s.ResourceContainer, err)
} else {
glog.V(2).Infof("Running in resource-only container %q", s.ResourceContainer)
}
}

if s.Broadcaster != nil && s.EventClient != nil {
s.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.EventClient.Events("")})
}

// Start up a healthz server if requested
// ...

// Start up a metrics server if requested
// ...

// Tune conntrack, if requested
// Conntracker is always nil for windows
// ...
informerFactory := informers.NewSharedInformerFactory(s.Client, s.ConfigSyncPeriod)

serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod)
serviceConfig.RegisterEventHandler(s.ServiceEventHandler)
go serviceConfig.Run(wait.NeverStop)

endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler)
go endpointsConfig.Run(wait.NeverStop)

go informerFactory.Start(wait.NeverStop)

s.birthCry()

s.Proxier.SyncLoop()
return nil
}

如果运行时带了 CleanupAndExit 参数,清理所有的 iptables,ipvs 的规则然后退出。OOM adjuster 和 ResourceContainer 目前都没有完整实现,先跳过,接着启动健康检查服务,metrics 服务器,用于监控,最后启动 informer,注册 service,endpoint 的监听事件,这两个 handler 都是之前注册的 proxierIPVS,当监听到 service,endpoint 的变化,就会触发响应的方法,注意到 ProxyServer 中定义的 ServiceEventHandlerEndpointsEventHandler 分别是 ServiceHandlerEndpointsHandler 类型,实际上定义的是一个接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
type ServiceHandler interface {
OnServiceAdd(service *v1.Service)
OnServiceUpdate(oldService, service *v1.Service)
OnServiceDelete(service *v1.Service)
OnServiceSynced()
}

type EndpointsHandler interface {
OnEndpointsAdd(endpoints *v1.Endpoints)
OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints)
OnEndpointsDelete(endpoints *v1.Endpoints)
OnEndpointsSynced()
}

pkg/proxy/ipvs/proxier.goProxier 中实现了这些方法,可以一一对照查看。Run 方法最后会调用 s.Proxier.SyncLoop,也属于 Proxier 的实现,这些都在 pkg/proxy/ipvs 模块里。

ipvs

以监听到增加 service 的事件为例,我们看看 Proxier 会做什么。

1
2
3
4
5
6
7
8
9
func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
proxier.OnServiceUpdate(nil, service)
}

func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
}

Update 方法在 pkg/proxy/service.go 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (sct *ServiceChangeTracker) Update(previous, current *v1.Service) bool {
svc := current
if svc == nil {
svc = previous
}
if svc == nil {
return false
}
namespacedName := types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}

sct.lock.Lock()
defer sct.lock.Unlock()

change, exists := sct.items[namespacedName]
if !exists {
change = &serviceChange{}
change.previous = sct.serviceToServiceMap(previous)
sct.items[namespacedName] = change
}
change.current = sct.serviceToServiceMap(current)
if reflect.DeepEqual(change.previous, change.current) {
delete(sct.items, namespacedName)
}
return len(sct.items) > 0
}

实际上只是对比 Service 的变化,然后将其存在一个 Map 数据结构中,最后 OnServiceUpdate 会调用 proxier.syncRunner.Run()

1
2
3
4
5
6
func (bfr *BoundedFrequencyRunner) Run() {
select {
case bfr.run <- struct{}{}:
default:
}
}

Run 方法会发送一个信号到 BoundedFrequencyRunner 的 run 这个 channel。还记得 Run 方法的最后,我们调用了 SyncLoop 方法:

1
2
3
4
5
6
func (proxier *Proxier) SyncLoop() {
if proxier.healthzServer != nil {
proxier.healthzServer.UpdateTimestamp()
}
proxier.syncRunner.Loop(wait.NeverStop)
}

Loop 方法在 pkg/util/async/bounded_frequency_runner.go 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
glog.V(3).Infof("%s Loop running", bfr.name)
bfr.timer.Reset(bfr.maxInterval)
for {
select {
case <-stop:
bfr.stop()
glog.V(3).Infof("%s Loop stopping", bfr.name)
return
case <-bfr.timer.C():
bfr.tryRun()
case <-bfr.run:
bfr.tryRun()
}
}
}

它会不停的运行,定期地执行 tryRun 方法,当接收到 run 事件时,也会调用 tryRun 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (bfr *BoundedFrequencyRunner) tryRun() {
bfr.mu.Lock()
defer bfr.mu.Unlock()

if bfr.limiter.TryAccept() {
bfr.fn()
bfr.lastRun = bfr.timer.Now()
bfr.timer.Stop()
bfr.timer.Reset(bfr.maxInterval)
glog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval)
return
}
// ...
}

这个 tryRun 方法会调用 bfr.fn(),这个回调函数其实就是在 proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) 注册的,也就是说所有的这些定期执行,通过事件触发执行的方法,最终会调用 syncProxyRules

这个方法特别长,借助设计文档里的伪代码可以帮助理解(有一些增改):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (proxier *Proxier) syncProxyRules() {
// 当服务或 endpoint 更新时,调用这个方法
// 同步 ipvs 规则或 iptables(如 mark,comment)。确保机器上有 kube-ipvs0 网卡,ipvs 需要在上面绑定地址,如果没有的话创建一个新的。确保有 ipset,用于批量设置 iptables 规则。
for svcName, svcInfo := range proxier.serviceMap {
// 处理 service 类型是 clusterIP 情况
// 从 svcInfo 中获取 ClusterIP 等信息,设置粘滞会话的类型,过期时间等信息
// 将 clusterIP 与虚拟网卡绑定,调用 libnetwork API 创建 VirtualServer
// 通过调用 syncEndpoint,更新 RealServer

// 处理 service 类型是 externalIPs 情况
// 创建一条 SNAT 规则,数据包转发给 external IP(端口转发),构建 ipvs VirtualServer,地址是 external IP
// 设置粘滞会话信息
// 调用 libnetwork API 创建 VirtualServer,RealServer

// 处理 service 类型是 load-balancer ingress 情况
for _, ingress := range svcInfo.LoadBalancerStatus.Ingress {
if ingress.IP != "" {
if len(svcInfo.LoadBalancerSourceRanges) != 0 {
install specific iptables
}
// 构建 ipvs 的 VirtualServer
// 设置粘滞会话信息
// 调用 libnetwork API 创建 VirtualServer,RealServer
}
}

// 处理 service 类型是 nodeports 情况
if svcInfo.NodePort != 0 {
fall back on iptables, recruit existing iptables proxier implementation
// 用 iptable 解决
}
// 清理工作,调用 libnet API 清理不需要的 ipvs 规则,清理不需要的网卡,service 地址的绑定,清理不需要的 iptables 链规则。
}
}

以上巨长的 syncProxyRules 方法就是 kube-proxy 创建负载均衡的主要逻辑,其中 service 和 endpoint 的结果可以从之前保存的 Map 中拿到。

References